// Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#pragma once
#include "db/memtable.h"
#include "pure_mem/key_index/inline_uk_index.h"
#include "pure_mem/rangearena/arena_handle_event.h"
#include "pure_mem/rangearena/range_arena_rebuild.h"
#include "rocksdb/memtablerep.h"
#include "util/arena.h"

namespace rocksdb {
class MultiRangeArena;
class PureMemRep : public MemTableRep {
 public:
  void AcceptDelete(const char *buf, void *rangearena) override;
  bool AcceptSingleDelete(const char *buf) override;
  void AcceptRangeDelete(const char *buf) override;
  void AcceptDelete(const VersionNode&) override;
  bool AcceptSingleDelete(const VersionNode&) override;
  void AcceptRangeDelete(const VersionNode&) override;

  friend class LookaheadIterator;

  explicit PureMemRep(const MemTableRep::KeyComparator &compare,
                      Allocator *allocator, const SliceTransform *transform,
                      Logger* info_log, const size_t lookahead);

  explicit PureMemRep(const MemTableRep::KeyComparator &compare,
                      Allocator *allocator, const SliceTransform *transform,
                      const ImmutableCFOptions &ioptions, const size_t lookahead);

  // in pure mem mode, no allow call this function.
  KeyHandle Allocate(const size_t len, char** buf) {assert(false); return nullptr;};

  KeyHandle AllocatePure(const size_t len, char **buf, const Slice &key,
                         void **rangearena) override;

  void AllocateOK(const Slice &key, const uint32_t encoded_len, KeyHandle handle, void *rangearena) override;
  // Get multi range arena pointer
  MultiRangeArena *GetMultiRangeArenaPtr() override;

  void Insert(KeyHandle handle) override;

  bool InsertKey(KeyHandle handle) override;

  void InsertWithHint(KeyHandle handle, void **hint) override;

  bool InsertKeyWithHint(KeyHandle handle, void **hint) override;

  void InsertConcurrently(KeyHandle handle) override;

  bool InsertKeyConcurrently(KeyHandle handle) override;

  // Returns true iff an entry that compares equal to key is in the list.
  bool Contains(const char *key) const override;

  size_t ApproximateMemoryUsage() override;

  void Get(const LookupKey &k, void *callback_args,
           bool (*callback_func)(void *arg, const char *entry)) override;

  uint64_t ApproximateNumEntries(const Slice &start_ikey,
                                 const Slice &end_ikey) override;
  ~PureMemRep() override;

  Slice UserKey(const char *key) const override;

  InlineUserKeyIndex<const MemTableRep::KeyComparator &>* GetARTList(){ return &art_list_;};
  // Iteration over the contents of a skip list
  class Iterator : public MemTableRep::Iterator {
    InlineUserKeyIndex<const MemTableRep::KeyComparator &>::Iterator iter_;

   public:
    // Initialize an iterator over the specified list.
    // The returned iterator is not valid.
    explicit Iterator(const InlineUserKeyIndex<const MemTableRep::KeyComparator &> *list)
        : iter_(list) {}

    ~Iterator() override {}

    // Returns true iff the iterator is positioned at a valid node.
    bool Valid() const override { return iter_.Valid(); }

    // Returns the key at the current position.
    // REQUIRES: Valid()
    const char *key() const override { return iter_.key(); }

    // Advances to the next position.
    // REQUIRES: Valid()
    void Next() override { iter_.Next(); }

    // Advances to the previous position.
    // REQUIRES: Valid()
    void Prev() override { iter_.Prev(); }

    // Advance to the first entry with a key >= target
    void Seek(const Slice &user_key, const char *memtable_key) override {
      if (memtable_key != nullptr) {
        iter_.Seek(memtable_key);
      } else {
        iter_.Seek(EncodeKey(&tmp_, user_key));
      }
    }

    // Retreat to the last entry with a key <= target
    void SeekForPrev(const Slice &user_key, const char *memtable_key) override {
      if (memtable_key != nullptr) {
        iter_.SeekForPrev(memtable_key);
      } else {
        iter_.SeekForPrev(EncodeKey(&tmp_, user_key));
      }
    }

    // Position at the first entry in list.
    // Final state of iterator is Valid() iff list is not empty.
    void SeekToFirst() override { iter_.SeekToFirst(); }

    // Position at the last entry in list.
    // Final state of iterator is Valid() iff list is not empty.
    void SeekToLast() override { iter_.SeekToLast(); }

   protected:
    std::string tmp_;  // For passing to EncodeKey
  };

  // Iterator over the contents of a skip list which also keeps track of the
  // previously visited node. In Seek(), it examines a few nodes after it
  // first, falling back to O(log n) search from the head of the list only if
  // the target key hasn't been found.
  class LookaheadIterator : public MemTableRep::Iterator {
   public:
    explicit LookaheadIterator(const PureMemRep &rep)
        : rep_(rep), iter_(&rep_.art_list_), prev_(iter_) {}

    ~LookaheadIterator() override {}

    bool Valid() const override { return iter_.Valid(); }

    const char *key() const override {
      assert(Valid());
      return iter_.key();
    }

    void Next() override {
      assert(Valid());

      bool advance_prev = true;
      if (prev_.Valid()) {
        auto k1 = rep_.UserKey(prev_.key());
        auto k2 = rep_.UserKey(iter_.key());

        if (k1.compare(k2) == 0) {
          // same user key, don't move prev_
          advance_prev = false;
        } else if (rep_.transform_) {
          // only advance prev_ if it has the same prefix as iter_
          auto t1 = rep_.transform_->Transform(k1);
          auto t2 = rep_.transform_->Transform(k2);
          advance_prev = t1.compare(t2) == 0;
        }
      }

      if (advance_prev) {
        prev_ = iter_;
      }
      iter_.Next();
    }

    void Prev() override {
      assert(Valid());
      iter_.Prev();
      prev_ = iter_;
    }

    void Seek(const Slice &internal_key, const char *memtable_key) override {
      const char *encoded_key = (memtable_key != nullptr)
                                ? memtable_key
                                : EncodeKey(&tmp_, internal_key);

      if (prev_.Valid() && rep_.cmp_(encoded_key, prev_.key()) >= 0) {
        // prev_.key() is smaller or equal to our target key; do a quick
        // linear search (at most lookahead_ steps) starting from prev_
        iter_ = prev_;

        size_t cur = 0;
        while (cur++ <= rep_.lookahead_ && iter_.Valid()) {
          if (rep_.cmp_(encoded_key, iter_.key()) <= 0) {
            return;
          }
          Next();
        }
      }

      iter_.Seek(encoded_key);
      prev_ = iter_;
    }

    void SeekForPrev(const Slice &internal_key,
                     const char *memtable_key) override {
      const char *encoded_key = (memtable_key != nullptr)
                                ? memtable_key
                                : EncodeKey(&tmp_, internal_key);
      iter_.SeekForPrev(encoded_key);
      prev_ = iter_;
    }

    void SeekToFirst() override {
      iter_.SeekToFirst();
      prev_ = iter_;
    }

    void SeekToLast() override {
      iter_.SeekToLast();
      prev_ = iter_;
    }

   protected:
    std::string tmp_;  // For passing to EncodeKey

   private:
    const PureMemRep &rep_;
    InlineUserKeyIndex<const MemTableRep::KeyComparator &>::Iterator iter_;
    InlineUserKeyIndex<const MemTableRep::KeyComparator &>::Iterator prev_;
  };

  class HashIterator : public MemTableRep::Iterator {
      InlineUserKeyIndex<const MemTableRep::KeyComparator &>::Iterator iter_;

    public:
        // Initialize an iterator over the specified list.
        // The returned iterator is not valid.
        explicit HashIterator(const InlineUserKeyIndex<const MemTableRep::KeyComparator &> *list)
                : iter_(list) {}

        ~HashIterator() override {}

        // Returns true iff the iterator is positioned at a valid node.
        bool Valid() const override { return iter_.Valid(); }

        // Returns the key at the current position.
        // REQUIRES: Valid()
        const char *key() const override { return iter_.key(); }

        // Advances to the next position.
        // REQUIRES: Valid()
        void Next() override { iter_.Next(); }

        // Advances to the previous position.
        // REQUIRES: Valid()
        void Prev() override { iter_.Prev(); }

        // Advance to the first entry with a key >= target
        void Seek(const Slice &user_key, const char *memtable_key) override {
            if (memtable_key != nullptr) {
                iter_.HashSeek(memtable_key);
            } else {
                iter_.HashSeek(EncodeKey(&tmp_, user_key));
            }
        }

        // Retreat to the last entry with a key <= target
        void SeekForPrev(const Slice &user_key, const char *memtable_key) override {
            if (memtable_key != nullptr) {
                iter_.HashSeekForPrev(memtable_key);
            } else {
                iter_.HashSeekForPrev(EncodeKey(&tmp_, user_key));
            }
        }

        // Position at the first entry in list.
        // Final state of iterator is Valid() iff list is not empty.
        void SeekToFirst() override { iter_.SeekToFirst(); }

        // Position at the last entry in list.
        // Final state of iterator is Valid() iff list is not empty.
        void SeekToLast() override { iter_.SeekToLast(); }

    protected:
        std::string tmp_;  // For passing to EncodeKey
    };

  MemTableRep::Iterator *GetIterator(Arena *arena = nullptr) override {
    if (lookahead_ > 0) {
      void *mem =
          arena ? arena->AllocateAligned(sizeof(PureMemRep::LookaheadIterator))
                :
          operator new(sizeof(PureMemRep::LookaheadIterator));
      return new(mem) PureMemRep::LookaheadIterator(*this);
    } else {
      void *mem = arena ? arena->AllocateAligned(sizeof(PureMemRep::Iterator)) :
                  operator new(sizeof(PureMemRep::Iterator));
      return new(mem) PureMemRep::Iterator(&art_list_);
    }
  }


    MemTableRep::Iterator *GetHashIterator(Arena *arena = nullptr) override {
        void *mem = arena ? arena->AllocateAligned(sizeof(PureMemRep::HashIterator)) :
                    operator new(sizeof(PureMemRep::HashIterator));
        return new(mem) PureMemRep::HashIterator(&art_list_);
    }

 private:
  const MemTableRep::KeyComparator &cmp_;
  InlineUserKeyIndex<const MemTableRep::KeyComparator &> art_list_;
  const SliceTransform *transform_{nullptr};
  const size_t lookahead_{0};
  MultiRangeArena* multi_range_arena_{nullptr};
  Logger* info_log_;
};

}  // namespace rocksdb
